package com.weixin.gong.example.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.utils.VerifiableProperties;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * @author weixin.gong
 * @date 15-7-3 下午1:57
 */
public class KafkaProducer implements Partitioner {
    public static void main(String[] args) {
        long events = 6L;
        Random random = new Random();

        Properties props = new Properties();
        //kafka cluster,不需要全部的地址,最少应该填2个,避免第一个地址是非活跃的节点
        props.put("metadata.broker.list", "l-flightdev20.f.dev.cn0.qunar.com:9092,l-flightdev21.f.dev.cn0.qunar.com:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //实现分区策略的类,即实现Partitioner接口
        props.put("partitioner.class", KafkaProducer.class.getName());
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            String key = "key" + random.nextInt(2);
            String msg = runtime + ",www.example.com," + key;
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("weixin", key, msg);
            producer.send(data);
        }
        producer.close();
    }

    public KafkaProducer (VerifiableProperties props) {
        //Partitioner必须有这么个构造函数
    }
    public int partition(Object o, int i) {
        //有且只有一个，只往一个分区里写，严格保证一致性
        return 0;
    }
}
